package com.heima.article.listener;

import com.alibaba.fastjson.JSON;
import com.heima.article.stream.KafkaStreamListener;
import com.heima.common.constants.article.HotArticleConstants;
import com.heima.model.article.mess.ArticleVisitStreamMess;
import com.heima.model.mess.app.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.UUID;

/**
 * 热门文章流式聚合计算监听器
 */
@Component
@Slf4j
public class HotArticleStreamListener  implements KafkaStreamListener<KStream<String,String>> {

    @Override
    public String listenerTopic() { //入口TOPIC
        return HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC;
    }

    @Override
    public String sendTopic() { //出口topic
        return HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC;
    }

    @Override
    public KStream<String, String> getService(KStream<String, String> kStream) {

        KStream<String, String> map = kStream.flatMapValues(value -> {
            UpdateArticleMess updateArticleMess = JSON.parseObject(value, UpdateArticleMess.class);
            //这里值的格式是：  1234:LIKES 或者 1234:VIEWS
            System.out.println("第一阶段  value:" +  updateArticleMess.getArticleId() + ":" + updateArticleMess.getType());
            return Arrays.asList(updateArticleMess.getArticleId() + ":" + updateArticleMess.getType());

        }).map((key, value) -> {
            System.out.println("第二阶段  value:" + value);
            return new KeyValue<>(value, null);

        }).groupByKey()
                .windowedBy(TimeWindows.of(10000))
                .count(Materialized.as("hot-compute-" + UUID.randomUUID().toString()))
                .toStream()
                .map((key, value) -> {
                    return new KeyValue<>(key.key().toString(), handleData(key.key().toString(), value.toString()));
                });
        return map;
    }

    private String handleData(String key, String value) {
        System.out.println("第三阶段  key:" + key +  ", value:"+ value);
        String articleId = key.split(":")[0]; //文章ID
        String type = key.split(":")[1]; //操作行为类型
        Long count = Long.valueOf(value);//聚合之后的总数

        ArticleVisitStreamMess articleVisitStreamMess = new ArticleVisitStreamMess();
        articleVisitStreamMess.setArticleId(Long.valueOf(articleId));
        if(type.equals(UpdateArticleMess.UpdateArticleType.LIKES.name())){
            articleVisitStreamMess.setLike(count.intValue());
        }
        if(type.equals(UpdateArticleMess.UpdateArticleType.VIEWS.name())){
            articleVisitStreamMess.setView(count.intValue());
        }
        if(type.equals(UpdateArticleMess.UpdateArticleType.COLLECTION.name())){
            articleVisitStreamMess.setCollect(count.intValue());
        }
        if(type.equals(UpdateArticleMess.UpdateArticleType.COMMENT.name())){
            articleVisitStreamMess.setComment(count.intValue());
        }
        return JSON.toJSONString(articleVisitStreamMess);
    }
}
